package com.test.demo.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Gao
 */
public class TransactionListenerLocal implements TransactionListener {

    private Map<String, Boolean> results = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("开始执行本地事务：" + o.toString());
        String orderId = o.toString();
        //模拟数据库保存(成功/失败)
        boolean result = Math.abs(Objects.hash(orderId)) % 2 == 0;
        results.put(orderId, result);
        return result ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
    }

    /**
     * 提供给事务执行状态检查的回调方法，给broker用的(异步回调）
     * 如果回查失败，消息就丢弃
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        String orderId = messageExt.getKeys();
        System.out.println("执行事务回调检查： orderId:" + orderId);
        boolean rs = results.get(orderId);
        //只有成功/失败
        System.out.println("数据的处理结果：" + rs);
        return rs ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }
}
